From ae0212521eeffb070742f34fb0cade814724fc09 Mon Sep 17 00:00:00 2001 From: "emellor@leeni.uk.xensource.com" Date: Thu, 8 Dec 2005 15:04:31 +0000 Subject: [PATCH] Strip huge piles of cruft from the connection infrastructure. We now actually block inside accept rather than using select to poll and then calling accept regardless of the outcome of the select call, and then failing because the socket is non-blocking. SocketClientConnection, SocketConnector, TCPClientConnection, TCPConnector, connectTCP, UnixClientConnection, UnixConnector, connectUnix have gone. loseConnection and stopListening and closeSocket (where they are needed) are now called close. startListening is now called listen. Closes bug #379. Relieves a weight from the shoulders of the universe. Signed-off-by: Ewan Mellor --- tools/python/xen/web/connection.py | 263 ++++------------------- tools/python/xen/web/protocol.py | 2 +- tools/python/xen/web/tcp.py | 50 +---- tools/python/xen/web/unix.py | 33 +-- tools/python/xen/xend/server/relocate.py | 13 +- 5 files changed, 51 insertions(+), 310 deletions(-) diff --git a/tools/python/xen/web/connection.py b/tools/python/xen/web/connection.py index 89de272db4..342e6a6f59 100644 --- a/tools/python/xen/web/connection.py +++ b/tools/python/xen/web/connection.py @@ -30,11 +30,8 @@ specifying what kind of socket they are. There are subclasses for TCP and unix-domain sockets (see tcp.py and unix.py). """ -"""We make sockets non-blocking so that operations like accept() -don't block. We also select on a timeout. Otherwise we have no way -of getting the threads to shutdown. -""" -SELECT_TIMEOUT = 2.0 +BUFFER_SIZE = 1024 + class SocketServerConnection: """An accepted connection to a server. @@ -45,73 +42,35 @@ class SocketServerConnection: self.protocol = protocol self.addr = addr self.server = server - self.buffer_n = 1024 - self.thread = None self.protocol.setTransport(self) + def run(self): - self.thread = threading.Thread(target=self.main) - self.thread.start() + threading.Thread(target=self.main).start() - def main(self): - while True: - if not self.thread: break - if self.select(): break - if not self.thread: break - data = self.read() - if data is None: continue - if data is True: break - if self.dataReceived(data): break - def select(self): - try: - select.select([self.sock], [], [], SELECT_TIMEOUT) - return False - except socket.error, ex: - if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR): - return False - else: - self.loseConnection(ex) - return True - - def read(self): + def main(self): try: - data = self.sock.recv(self.buffer_n) - if data == '': - self.loseConnection() - return True - return data - except socket.error, ex: - if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR): - return None - else: - self.loseConnection(ex) - return True + while True: + try: + data = self.sock.recv(BUFFER_SIZE) + if data == '': + break + if self.protocol.dataReceived(data): + break + except socket.error, ex: + if ex.args[0] not in (EWOULDBLOCK, EAGAIN, EINTR): + break + finally: + try: + self.sock.close() + except: + pass - def dataReceived(self, data): - try: - self.protocol.dataReceived(data) - except SystemExit: - raise - except Exception, ex: - self.loseConnection(ex) - return True - return False def write(self, data): self.sock.send(data) - def loseConnection(self, reason=None): - self.thread = None - self.closeSocket(reason) - - def closeSocket(self, reason): - try: - self.sock.close() - except SystemExit: - raise - except: - pass class SocketListener: """A server socket, running listen in a thread. @@ -126,192 +85,44 @@ class SocketListener: self.backlog = backlog self.thread = None + def createSocket(self): raise NotImplementedError() + def setCloExec(self): fcntl.fcntl(self.sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC) + def acceptConnection(self, sock, protocol, addr): return SocketServerConnection(sock, protocol, addr, self) - def startListening(self): + + def listen(self): if self.sock or self.thread: raise IOError("already listening") self.sock = self.createSocket() - self.sock.setblocking(0) self.sock.listen(self.backlog) self.run() - def stopListening(self, reason=None): - self.loseConnection(reason) def run(self): self.thread = threading.Thread(target=self.main) self.thread.start() - def main(self): - while True: - if not self.thread: break - if self.select(): break - if self.accept(): break - - def select(self): - try: - select.select([self.sock], [], [], SELECT_TIMEOUT) - return False - except socket.error, ex: - if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR): - return False - else: - self.loseConnection(ex) - return True - - def accept(self): - try: - (sock, addr) = self.sock.accept() - sock.setblocking(0) - return self.accepted(sock, addr) - except socket.error, ex: - if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR): - return False - else: - self.loseConnection(ex) - return True - - def accepted(self, sock, addr): - self.acceptConnection(sock, self.protocol_class(), addr).run() - return False - - def loseConnection(self, reason=None): - self.thread = None - self.closeSocket(reason) - - def closeSocket(self, reason): - try: - self.sock.close() - except SystemExit: - raise - except Exception, ex: - pass - - -class SocketClientConnection: - """A connection to a server from a client. - - Call connectionMade() on the protocol in a thread when connected. - It is completely up to the protocol what to do. - """ - - def __init__(self, connector): - self.addr = None - self.connector = connector - self.buffer_n = 1024 - - def createSocket (self): - raise NotImplementedError() - - def write(self, data): - if self.sock: - return self.sock.send(data) - else: - return 0 - - def connect(self, timeout): - #todo: run a timer to cancel on timeout? - try: - sock = self.createSocket() - sock.connect(self.addr) - self.sock = sock - self.protocol = self.connector.protocol_class() - self.protocol.setTransport(self) - except SystemExit: - raise - except Exception, ex: - self.connector.connectionFailed(ex) - return False - - self.thread = threading.Thread(target=self.main) - #self.thread.setDaemon(True) - self.thread.start() - return True def main(self): try: - # Call the protocol in a thread. - # Up to it what to do. - self.protocol.connectionMade(self.addr) - except SystemExit: - raise - except Exception, ex: - self.loseConnection(ex) - - def mainLoop(self): - # Something a protocol could call. - while True: - if not self.thread: break - if self.select(): break - if not self.thread: break - data = self.read() - if data is None: continue - if data is True: break - if self.dataReceived(data): break - - def select(self): - try: - select.select([self.sock], [], [], SELECT_TIMEOUT) - return False - except socket.error, ex: - if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR): - return False - else: - self.loseConnection(ex) - return True - - def read(self): - try: - data = self.sock.recv(self.buffer_n) - return data - except socket.error, ex: - if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR): - return None - else: - self.loseConnection(ex) - return True - - def dataReceived(self, data): - if not self.protocol: - return True - try: - self.protocol.dataReceived(data) - except SystemExit: - raise - except Exception, ex: - self.loseConnection(ex) - return True - return False - - def loseConnection(self, reason=None): - self.thread = None - self.closeSocket(reason) - - def closeSocket(self, reason): - try: - if self.sock: + while True: + try: + (sock, addr) = self.sock.accept() + self.acceptConnection(sock, self.protocol_class(), + addr).run() + except socket.error, ex: + if ex.args[0] not in (EWOULDBLOCK, EAGAIN, EINTR): + break + finally: + try: self.sock.close() - except SystemExit: - raise - except: - pass - -class SocketConnector: - """A client socket. Connects to a server and runs the client protocol - in a thread. - """ - - def __init__(self, protocol_class): - self.protocol_class = protocol_class - self.transport = None - - def connect(self): - pass + except: + pass diff --git a/tools/python/xen/web/protocol.py b/tools/python/xen/web/protocol.py index 54f44b0628..603973a3fd 100644 --- a/tools/python/xen/web/protocol.py +++ b/tools/python/xen/web/protocol.py @@ -25,7 +25,7 @@ class Protocol: self.transport = transport def dataReceived(self, data): - print 'Protocol>dataReceived>' + raise NotImplementedError() def write(self, data): if self.transport: diff --git a/tools/python/xen/web/tcp.py b/tools/python/xen/web/tcp.py index 674bf93f8c..f3d67a7de4 100644 --- a/tools/python/xen/web/tcp.py +++ b/tools/python/xen/web/tcp.py @@ -13,16 +13,16 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA #============================================================================ # Copyright (C) 2005 Mike Wray +# Copyright (C) 2005 XenSource Ltd. #============================================================================ -import sys + import socket -import types import time import errno from connection import * -from protocol import * + class TCPListener(SocketListener): @@ -52,48 +52,8 @@ class TCPListener(SocketListener): def acceptConnection(self, sock, protocol, addr): return SocketServerConnection(sock, protocol, addr, self) -class TCPClientConnection(SocketClientConnection): - - def __init__(self, host, port, bindAddress, connector): - SocketClientConnection.__init__(self, connector) - self.addr = (host, port) - self.bindAddress = bindAddress - - def createSocket(self): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - if self.bindAddress is not None: - sock.bind(self.bindAddress) - return sock - -class TCPConnector(SocketConnector): - - def __init__(self, host, port, protocol, timeout=None, bindAddress=None): - SocketConnector.__init__(self, protocol) - self.host = host - self.port = self.servicePort(port) - self.bindAddress = bindAddress - self.timeout = timeout - - def servicePort(self, port): - if isinstance(port, types.StringTypes): - try: - port = socket.getservbyname(port, 'tcp') - except socket.error, ex: - raise IOError("unknown service: " + ex) - return port - - def connect(self): - self.transport = TCPClientConnection( - self.host, self.port, self.bindAddress, self) - self.transport.connect(self.timeout) def listenTCP(port, protocol, interface='', backlog=None): l = TCPListener(port, protocol, interface=interface, backlog=backlog) - l.startListening() - return l - -def connectTCP(host, port, protocol, timeout=None, bindAddress=None): - c = TCPConnector(host, port, protocol, timeout=timeout, - bindAddress=bindAddress) - c.connect() - return c + l.listen() + l.setCloExec() diff --git a/tools/python/xen/web/unix.py b/tools/python/xen/web/unix.py index 2d03b09260..64ae2fcf1b 100644 --- a/tools/python/xen/web/unix.py +++ b/tools/python/xen/web/unix.py @@ -16,13 +16,13 @@ # Copyright (C) 2005 XenSource Ltd. #============================================================================ -import sys + import socket import os import os.path from connection import * -from protocol import * + class UnixListener(SocketListener): @@ -48,33 +48,6 @@ class UnixListener(SocketListener): def acceptConnection(self, sock, protocol, addr): return SocketServerConnection(sock, protocol, self.path, self) -class UnixClientConnection(SocketClientConnection): - - def __init__(self, addr, connector): - SocketClientConnection.__init__(self, connector) - self.addr = addr - - def createSocket(self): - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - return sock - -class UnixConnector(SocketConnector): - - def __init__(self, path, protocol, timeout=None): - SocketConnector.__init__(self, protocol) - self.addr = path - self.timeout = timeout - - def connect(self): - self.transport = UnixClientConnection(self.addr, self) - self.transport.connect(self.timeout) def listenUNIX(path, protocol, backlog=None): - l = UnixListener(path, protocol, backlog=backlog) - l.startListening() - return l - -def connectUNIX(path, protocol, timeout=None): - c = UnixConnector(path, protocol, timeout=timeout) - c.connect() - return c + UnixListener(path, protocol, backlog=backlog).listen() diff --git a/tools/python/xen/xend/server/relocate.py b/tools/python/xen/xend/server/relocate.py index d4480f242e..c20d577156 100644 --- a/tools/python/xen/xend/server/relocate.py +++ b/tools/python/xen/xend/server/relocate.py @@ -44,15 +44,15 @@ class RelocationProtocol(protocol.Protocol): res = self.dispatch(val) self.send_result(res) if self.parser.at_eof(): - self.loseConnection() + self.close() except SystemExit: raise except: self.send_error() - def loseConnection(self): + def close(self): if self.transport: - self.transport.loseConnection() + self.transport.close() def send_reply(self, sxpr): io = StringIO.StringIO() @@ -100,15 +100,13 @@ class RelocationProtocol(protocol.Protocol): return l def op_quit(self, _1, _2): - self.loseConnection() + self.close() def op_receive(self, name, _): if self.transport: self.send_reply(["ready", name]) - self.transport.sock.setblocking(1) XendDomain.instance().domain_restore_fd( self.transport.sock.fileno()) - self.transport.sock.setblocking(0) else: log.error(name + ": no transport") raise XendError(name + ": no transport") @@ -122,5 +120,4 @@ def listenRelocation(): if xroot.get_xend_relocation_server(): port = xroot.get_xend_relocation_port() interface = xroot.get_xend_relocation_address() - l = tcp.listenTCP(port, RelocationProtocol, interface=interface) - l.setCloExec() + tcp.listenTCP(port, RelocationProtocol, interface=interface) -- 2.30.2